package com.hg.transaction;

import cn.hutool.core.util.StrUtil;
import com.hg.MqConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
//import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

import java.io.UnsupportedEncodingException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * 事务消息生产者
 * created by skh on 2019/12/15
 */
@Slf4j
public class TransactionProducer {

	public static void main(String[] args) throws MQClientException, InterruptedException {
		TransactionMQProducer producer = new TransactionMQProducer("transaction_producer_group");
		producer.setNamesrvAddr(MqConfig.nameServerAddr);

		ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), new ThreadFactory() {
			@Override
			public Thread newThread(Runnable r) {
				Thread thread = new Thread(r);
				thread.setName("client-transaction-msg-check-thread");
				return thread;
			}
		});

		producer.setExecutorService(threadPoolExecutor);
		producer.setTransactionListener(new TransactionListener() {
			@Override
			public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
				//本地事务处理逻辑
				log.info("本地事务执行...");
				log.info("消息标签:{}", msg.getTags());
				try {
					log.info("消息内容:{}", new String(msg.getBody(),"UTF-8"));
				} catch (UnsupportedEncodingException e) {
					e.printStackTrace();
				}
				String tags = msg.getTags();
				if (tags.equals("Transaction1")) {
					log.error("模拟本地事务执行失败");
					//表示本地事务执行失败,当事务执行失败时需要返回ROLLBACK消息
					return LocalTransactionState.ROLLBACK_MESSAGE;
				}

				log.info("模拟本地事务执行成功");

				//表示本地事务执行成功
				return LocalTransactionState.COMMIT_MESSAGE;
			}

			@Override
			public LocalTransactionState checkLocalTransaction(MessageExt msg) {
				log.info("服务器调用消息回查结果");
				log.info("消息标签:{}", msg.getTags());
				try {
					log.info("消息内容:{}", new String(msg.getBody(),"UTF-8"));
				} catch (UnsupportedEncodingException e) {
					e.printStackTrace();
				}
				return LocalTransactionState.COMMIT_MESSAGE;
			}
		});
		producer.setVipChannelEnabled(false);
		producer.setSendMsgTimeout(5*60*1000);
		producer.start();
		log.info("生产者启动成功");
		//演示事务执行成功和失败的效果.
		for (int i = 0; i < 2; i++) {
			Message message = new Message("TopicTransaction", "Transaction" + i, ("hello rocketmq distribution transaction" + i).getBytes());
			TransactionSendResult transactionSendResult = producer.sendMessageInTransaction(message, null);
			System.out.println("transactionSendResult = " + transactionSendResult);
		}
	}
}
